Revision 794 (by ahitrov, 2020/05/27 11:08:12) Bug

package sphinx::Keeper;

use strict;
use warnings 'all';
use base qw(Contenido::Keeper);
use Contenido::Globals;
use LWP::UserAgent;
use JSON::XS;
use Data::Dumper;

my $ua = LWP::UserAgent->new;
$ua->timeout(1);


######################
#  Отправить объект в поиск:
#       Вызов: $keeper->set_search( $object );
#       Объект должен обязательно иметь метод 
#	->get_search_data,
#       возвращающий структуру:
#       {
#               id      => $object->id,
#               class   => $object->class,
#               name    => $object_name,
#               text    => $search_text
#       }
#	Кроме того, метод чувствителен к полю status и предполагает,
#	что status=1 - документ активен; status=0 - документ не активен.
##########################################################################
sub send
{
	my $self = shift;
	my $doc = shift;
	return undef	unless ref $doc && $doc->id;

	my ($object) = $keeper->get_documents(
			class   => 'sphinx::Search',
			object_id       => $doc->id,
			object_class    => $doc->class,
		);
	if ( $doc->status ) {
		my $data = $doc->get_search_data;
		if ( ref $object && !$data ) {
			$object->delete;
			return undef;
		}
		unless ( ref $object ) {
			$object = sphinx::Search->new( $keeper );
			$object->status( 1 );
			$object->is_deleted( 0 );
			$object->object_id( $doc->id );
			$object->object_class( $doc->class );
			$object->name( $data->{name} );
			$object->search( $data->{text} );
			$object->store;
		} else {
			if ( $data->{name} ne $object->name || $data->{text} ne $object->search || $object->is_deleted || $object->status <= 0 ) {
				$object->status( 1 );
				$object->is_deleted( 0 );
				$object->name( $data->{name} );
				$object->search( $data->{text} );
				$object->store;
			}
		}
	} else {
		if ( ref $object ) {
#			$object->status( 0 );
#			$object->is_deleted( 1 );
#			$object->store;
			$object->delete;
		}
	}
}


# Методы поиска
####################################################################
sub proxy {
    my $self = shift;
    my $text = shift;
    return	unless $text;
    my $opts = shift // {};

    my $db_table = delete $opts->{db_table} || $self->state->table_name;
    my $uri = URI->new( 'http://'.$self->state->db_host.':'.$self->state->db_port.'/search/'.$db_table );
    $uri->query_param( search => $text );
    if ( exists $opts->{object_class} ) {
	if ( ref $opts->{object_class} eq 'ARRAY' ) {
		$uri->query_param( class => $opts->{object_class} );
	} else {
		$uri->query_param( class => $opts->{object_class} );
	}
    }
    if ( exists $opts->{order_by} ) {
	my @Orders;
	if ( ref $opts->{order_by} eq 'ARRAY' ) {
		foreach my $order ( @{$opts->{order_by}} ) {
			if ( ref $order eq 'ARRAY' && scalar @$order > 0 &&  scalar @$order <= 2 ) {
				if ( scalar @$order == 2 && lc($order->[1]) eq 'desc' ) {
					push @Orders, { field => $order->[0], reverse => 'desc' };
				} else {
					push @Orders, { field => $order->[0] };
				}
			} elsif ( $order && !ref $order ) {
				my @parts = split /\s*,\s+/, $order;
				if ( scalar @parts == 2 && lc($parts[1]) eq 'desc' ) {
					push @Orders, { field => $parts[0], reverse => 'desc' };
				} else {
					push @Orders, { field => $parts[0] };
				}
			}
		}
	} else {
		my @orders = split /\s*,\s+/, $opts->{order_by};
		foreach my $order ( @orders ) {
			if ( $order =~ /^([\w\.]+)\s+(asc|desc)/i ) {
				push @Orders, { field => $1, reverse => $2 };
			} else {
				 push @Orders, { field => $order };
			}
		}
	}
	my $i = 1;
	foreach my $order ( @Orders ) {
		$uri->query_param( "order_$i" => $order->{field} );
		if ( exists $order->{reverse} && lc($order->{reverse}) eq 'desc' ) {
			$uri->query_param( "sort_$i" => 'desc' );
		}
		last	if ++$i > 5;
	}
    }
    if ( exists $opts->{limit} ) {
	$uri->query_param( limit => $opts->{limit} );
    }
    if ( exists $opts->{offset} ) {
	$uri->query_param( offset => $opts->{offset} );
    }
    warn "Ask sphinx: $uri\n"				if $DEBUG;
    my $res = $ua->get($uri);
    if ($res->is_success) {
	my $body = $res->decoded_content;
	warn "Response: [$body]\n"			if $DEBUG;
	my $result = decode_json $body;
	return $result;
    } else {
	warn "SPHINX PROXY error! Code: ".$res->code.". Status: ".$res->status_line."\n";
    }
}

sub search {
    my $self = shift;
    my $text = shift;
    return	unless $text;
    my (%opts) = @_;

    my $result;
    my $db_table = delete $opts{db_table} || $self->state->table_name;
    my @wheres = ("MATCH(?)");
    my @values = ($text);
    my @orders;
    my $count = delete $opts{count};
    my $limit = delete $opts{limit};
    return	if $limit && ($limit =~ /\D/ || $limit < 0);
    my $offset = delete $opts{offset};
    return	if $offset && ($offset =~ /\D/ || $offset < 0);
    my $no_limit = delete $opts{no_limit};
    unless ( $no_limit ) {
	$limit ||= 1000;
    }
    if ( $count ) {
	$limit = 0;
	$offset = 0;
    }
    my $weights = delete $opts{weights};
    my $return_value = delete $opts{return_value} || 'array_ref';
    my $hash_by = delete $opts{hash_by} || 'object_id';

    if ( exists $opts{object_class} ) {
	push @wheres, "object_class in (".join( ',', map { '?' } ref $opts{object_class} eq 'ARRAY' ? @{$opts{object_class}} : ($opts{object_class}) ).")";
	push @values, ref $opts{object_class} ? @{$opts{object_class}} : $opts{object_class};
    }

    if ( exists $opts{order_by} ) {
	push @orders, $opts{order_by};
    } elsif ( !$count ) {
	push @orders, 'weight desc, last_edited desc';
    }

    my $query = "SELECT ".($count ? 'count(*) as cnt' : '*, weight() as weight')." FROM $db_table WHERE ".join( ' AND ', @wheres ).(@orders ? " ORDER BY ".join(', ', @orders) : '');
    if ( $limit && $offset ) {
	$query .= " LIMIT $offset, $limit ";
    } elsif ( $limit ) {
	$query .= " LIMIT 0, $limit ";
    }
    if ( ref $weights eq 'HASH' ) {
	my @weights;
	foreach my $field ( keys %$weights ) {
		push @weights, "$field=".$weights->{$field};
	}
	if ( @weights ) {
		$query .= " OPTION field_weights=(".join(', ', @weights).") ";
	}
    }
    warn "SEARCH QUERY: $query\n"		if $DEBUG;
    warn "SEARCH VALUES: ".Dumper( \@values )	if $DEBUG;
    my $sth = $self->SQL->prepare( $query );
    if ( $sth->execute( @values ) ) {
	if ( $count ) {
		$result = $sth->fetchrow_arrayref;
		warn "COUNT: ". Dumper( $result )	if $DEBUG;
		$result = $result->[0];
	} else {
		$result = [];
		while ( my $row = $sth->fetchrow_hashref ) {
			push @$result, $row;
		}
	}
    } else {
	warn "ERROR in statement: ".$sth->errstr."\n";
	warn "SEARCH QUERY: $query\n";
	warn "SEARCH VALUES: ".Dumper( \@values );
    }
    $sth->finish;
    return $result;
}

sub stemmed {
    my $self = shift;
    my $db_table = $self->state->table_name_stemmed;
    return $self->search( @_, db_table => $db_table );
}

# МЕТОДЫ ДОСТУПА К СОЕДИНЕНИЯМ С БАЗОЙ УМНЫЕ
####################################################################
# получение соединения с базой или установка нового если его не было
sub SQL {
    my $self = shift;
    return ($self->connect_check() ? $self->{SQL} : undef);
}

# -------------------------------------------------------------------------------------------------
# Открываем соединение с базой данных
# -------------------------------------------------------------------------------------------------
sub connect {
    my $self = shift;
    #соединение уже есть
    if ($self->is_connected) {
    } else {
	unless ($self->{SQL} = $self->db_connect) {
		warn "Не могу соединиться с базой данных";
		die;
	}
	$self->{SQL}->do("SET NAMES '".$self->state->db_client_encoding."'") if ($self->state->db_client_encoding);
    }

    $self->{_connect_ok} = 1;
    return 1;
}

#проверка соединения с базой кеширующая состояние соединения
sub connect_check {
    my $self = shift;
    return 1 if ($self->{_connect_ok});
    if ($self->is_connected) {
	$self->{_connect_ok} = 1;
	return 1;
    } else {
	if ($self->connect) {
		return 1;
	} else {
		#сюда по логике попадать не должно так как die вылететь должен
		warn "Connect failed\n";
		return 0;
	}
    }
}

sub db_connect {
    my $self = shift;
    my $dbh = DBI->connect('DBI:mysql:host='.$self->{db_host}.';port='.$self->{db_port}.';mysql_enable_utf8=1')
		|| die "Contenido Error: Не могу соединиться с Sphinx базой данных\n";

    $dbh->{mysql_auto_reconnect} = 1;
#    $dbh->{'AutoCommit'} = 1;

    return $dbh;
}

sub is_connected {
    my $self = shift;
    if ( ref $self->{SQL} and $self->{SQL}->can('ping') and $self->{SQL}->ping() ) {
	$self->{_connect_ok} = 1;
	return 1;
    } else {
	$self->{_connect_ok} = 0;
	return 0;
    }

#	warn 'Check if MySQL DB connected: '.(ref $self && exists $self->{SQL} && ref $self->{SQL} ? 1 : 0 )    if $DEBUG;
#	return ( ref($self) && exists $self->{SQL} && ref $self->{SQL} );
}
1;